AWS Step FunctionsステートマシンでDynamoDBテーブルからアイテムを取得してMapステートで処理する(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
前回のエントリでは、Step Functions Workflow Studioを使用して、DynamoDBテーブルをQueryして取得したアイテムをMapステートで処理(SMSメッセージを送信)するステートマシンを作成しました。
今回は、同じくAWS Step FunctionsのステートマシンからDynamoDBテーブルをQueryして取得したアイテムをMapステートで処理(SMSメッセージを送信)する構成を、AWS CDKでで作ってみました。
作るもの
下記のようなグラフの定義のステートマシンを作成します。queryCrewsTable
からDynamoDBテーブルをQueryし、Mapステート内のsendingSMSTasks
からAmazon SNSでSMSメッセージを送信します。
やってみた
対象のDynamoDBテーブル
下記のDynamoDBテーブルをクエリ対象とします。
- テーブル名:
crews
- PK:
crewId
(String) - GSI名:
areaId-index
- GSI-PK:
areaId
(String)
格納されているデータは以下となります。
CDKコード
import * as cdk from '@aws-cdk/core'; import * as sfn from '@aws-cdk/aws-stepfunctions'; import * as tasks from '@aws-cdk/aws-stepfunctions-tasks'; export class AwsCdkAppStack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const accountId = cdk.Stack.of(this).account; const region = cdk.Stack.of(this).region; const crewsTableName = 'crews'; const crewsTableAreaIdIndex = 'areaId-index'; //Crewsテーブルデータ取得タスク const queryCrewsTable = new tasks.CallAwsService(this, 'queryCrewsTable', { service: 'dynamodb', action: 'query', parameters: { TableName: crewsTableName, IndexName: crewsTableAreaIdIndex, ExpressionAttributeNames: { '#areaId': 'areaId', }, ExpressionAttributeValues: { ':areaId': { 'S.$': '$.areaId', }, }, KeyConditionExpression: '#areaId = :areaId', }, iamResources: [ `arn:aws:dynamodb:${region}:${accountId}:table/${crewsTableName}/index/${crewsTableAreaIdIndex}`, ], iamAction: 'dynamodb:Query', resultSelector: { 'Items.$': '$.Items', }, resultPath: sfn.JsonPath.stringAt('$.crews'), }); //Mapステート const sendingMap = new sfn.Map(this, 'sendingMap', { itemsPath: sfn.JsonPath.stringAt('$.crews.Items'), parameters: { 'phoneNumber.$': '$$.Map.Item.Value.phoneNumber.S', 'message.$': '$.message', 'senderId.$': '$.senderId', }, }); //SMS送信タスク const sendingSMSTask = new tasks.CallAwsService(this, 'sendingSMSTask', { service: 'sns', action: 'publish', parameters: { 'PhoneNumber.$': '$.phoneNumber', 'Message.$': '$.message', MessageAttributes: { 'AWS.SNS.SMS.SenderID': { DataType: tasks.MessageAttributeDataType.STRING, 'StringValue.$': '$.senderId', }, }, }, iamResources: ['*'], iamAction: 'sns:Publish', }); //MapのイテレーターにSMS送信タスクを指定 sendingMap.iterator(sendingSMSTask); //ステートマシン new sfn.StateMachine(this, 'stateMachine', { definition: queryCrewsTable.next(sendingMap), }); } }
cdk deploy
してデプロイします。
マネジメントコンソールから確認すると、ステートマシンが作成できています。
ステートマシン定義は以下のようになります。
{ "StartAt": "queryCrewsTable", "States": { "queryCrewsTable": { "Next": "sendingMap", "Type": "Task", "ResultPath": "$.crews", "ResultSelector": { "Items.$": "$.Items" }, "Resource": "arn:aws:states:::aws-sdk:dynamodb:query", "Parameters": { "TableName": "crews", "IndexName": "areaId-index", "ExpressionAttributeNames": { "#areaId": "areaId" }, "ExpressionAttributeValues": { ":areaId": { "S.$": "$.areaId" } }, "KeyConditionExpression": "#areaId = :areaId" } }, "sendingMap": { "Type": "Map", "End": true, "Parameters": { "phoneNumber.$": "$$.Map.Item.Value.phoneNumber.S", "message.$": "$.message", "senderId.$": "$.senderId" }, "Iterator": { "StartAt": "sendingSMSTask", "States": { "sendingSMSTask": { "End": true, "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:sns:publish", "Parameters": { "PhoneNumber.$": "$.phoneNumber", "Message.$": "$.message", "MessageAttributes": { "AWS.SNS.SMS.SenderID": { "DataType": "String", "StringValue.$": "$.senderId" } } } } } }, "ItemsPath": "$.crews.Items" } } }
またステートマシンの実行ロールも合わせて作成されますが、そのインラインポリシーは下記のようになります。CDK定義上での各ステートマシンTaskごとのiamResources
およびiamAction
の指定がそのままポリシーとなっています。
{ "Version": "2012-10-17", "Statement": [ { "Action": "dynamodb:Query", "Resource": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/crews/index/areaId-index", "Effect": "Allow" }, { "Action": "sns:Publish", "Resource": "*", "Effect": "Allow" } ] }
動作
下記のJSONを指定して、ステートマシンを実行します。a001
のエリアにいるクルーにTaito-Ku
に移動するように通知する入力です。
{ "areaId": "a001", "message": "Please move to Taito-Ku.", "senderId": "AllocateApp" }
実行に成功しました。
それぞれのクルーの電話番号の端末でもSMSのメッセージが受信できています。
参考
- 詳細に設定されたアクセスコントロールのための IAM ポリシー条件の使用 - Amazon DynamoDB
- AWS SDK service integrations - AWS Step Functions
- AWS Step FunctionsステートマシンからMapステートを使用して複数のSMSメッセージを送信する(AWS CDK) | DevelopersIO
以上